Flink Table Store与Spark集成

E-MapReduce的Flink Table Store服务支持通过Spark SQL对Flink Table Store进行读写操作。本文通过示例为您介绍如何通过Spark SQL对Flink Table Store进行读写操作。

使用限制

  • 仅EMR-3.45.0版本、EMR-5.11.0版本的集群,支持Spark SQL和Spark CLI对Flink Table Store进行读写操作。

  • 仅Spark3的Spark SQL可以通过Catalog读写Flink Table Store。

  • Spark CLI只能通过文件系统或对象存储的路径读取Flink Table Store。

操作步骤

步骤一:创建Catalog

Flink Table Store将数据和元数据都保存在文件系统或对象存储中,存储的根路径由spark.sql.catalog.tablestore.warehouse参数指定。如果指定的warehouse路径不存在,将会自动创建该路径;如果指定的warehouse路径存在,您可以通过该Catalog访问路径中已有的表。

您还可以同步元数据到Hive或DLF中,方便其它服务访问Flink Table Store。

  • 创建Filesystem Catalog。

    Filesystem Catalog仅将元数据保存在文件系统或对象存储中。

    spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=filesystem --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
  • 创建Hive Catalog

    Hive Catalog会同步元数据到Hive MetaStore中。在Hive Catalog中创建的表可以直接在Hive中查询。

    Hive查询Flink Table Store,详情请参见Flink Table Store与Hive集成

    spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=hive --conf spark.sql.catalog.tablestore.uri=thrift://master-1-1:9083 --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
    说明

    spark.sql.catalog.tablestore.uri为Hive MetaStore Service的地址。

  • 创建DLF Catalog

    DLF Catalog会同步元数据到DLF中。

    重要

    创建集群时,元数据须为DLF统一元数据

    spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=dlf --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse

步骤二 :通过Spark SQL读写Flink Table Store中的数据

执行以下Spark SQL语句,在Catalog中创建一张表,并读写表中的数据。

-- 在创建的Catalog中,创建并使用一个测试database。
CREATE DATABASE tablestore.test_db;
USE tablestore.test_db;

-- 创建Flink Table Store表。
CREATE TABLE test_tbl (
    uuid int,
    name string,
    price double
) TBLPROPERTIES (
    'primary-key' = 'uuid'
);

-- 向Flink Table Store中写入数据。
INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);

-- 读取表中的数据。
SELECT * FROM test_tbl;

步骤三:通过Spark CLI读Flink Table Store中的数据

  1. 执行以下命令,启动Spark CLI。

    spark-shell
  2. 在Spark CLI中运行以下Scala代码,查询指定目录下存储的Flink Table Store表。

    val dataset = spark.read.format("tablestore").load("oss://oss-bucket/warehouse/test_db.db/test_tbl")
    dataset.createOrReplaceTempView("test_tbl")
    spark.sql("SELECT * FROM test_tbl").show()